From 2224656b1795d479f5d95347882aad643327b1ff Mon Sep 17 00:00:00 2001 From: "mjw@wray-m-3.hpl.hp.com" Date: Thu, 8 Jul 2004 12:05:58 +0000 Subject: [PATCH] bitkeeper revision 1.1052 (40ed38a6k6vT2ycbqz1BZA6U9KRVgg) Start of save support in xfrd. --- .rootkeys | 1 - tools/python/xen/xend/XendMigrate.py | 200 +++++++++------- tools/python/xen/xend/packing.py | 329 --------------------------- tools/xfrd/xfrdClient.py | 1 - 4 files changed, 123 insertions(+), 408 deletions(-) delete mode 100644 tools/python/xen/xend/packing.py diff --git a/.rootkeys b/.rootkeys index 66c3c5ef76..f953e63aea 100644 --- a/.rootkeys +++ b/.rootkeys @@ -242,7 +242,6 @@ 40c9c468xzANp6o2D_MeCYwNmOIUsQ tools/python/xen/xend/XendVnet.py 40c9c468x191zetrVlMnExfsQWHxIQ tools/python/xen/xend/__init__.py 40c9c468S2YnCEKmk4ey8XQIST7INg tools/python/xen/xend/encode.py -40e9808elkoRulOo1GxRTp5ulJGVNw tools/python/xen/xend/packing.py 40c9c468DCpMe542varOolW1Xc68ew tools/python/xen/xend/server/SrvBase.py 40c9c468IxQabrKJSWs0aEjl-27mRQ tools/python/xen/xend/server/SrvConsole.py 40c9c4689Io5bxfbYIfRiUvsiLX0EQ tools/python/xen/xend/server/SrvConsoleDir.py diff --git a/tools/python/xen/xend/XendMigrate.py b/tools/python/xen/xend/XendMigrate.py index a4a4183ceb..4d19a772ac 100644 --- a/tools/python/xen/xend/XendMigrate.py +++ b/tools/python/xen/xend/XendMigrate.py @@ -19,79 +19,41 @@ XFRD_PORT = 8002 XFR_PROTO_MAJOR = 1 XFR_PROTO_MINOR = 0 -class Migrate(Protocol): +class Xfrd(Protocol): + """Protocol handler for a connection to the migration/save daemon xfrd. + """ - def __init__(self, minfo): + def __init__(self, xinfo): self.parser = sxp.Parser() - self.minfo = minfo + self.xinfo = xinfo def connectionMade(self): # Send hello. self.request(['xfr.hello', XFR_PROTO_MAJOR, XFR_PROTO_MINOR]) - # Send migrate. - vmconfig = self.minfo.vmconfig() - if not vmconfig: - self.loseConnection() - return - self.request(['xfr.migrate', - self.minfo.src_dom, - vmconfig, - self.minfo.dst_host, - self.minfo.dst_port]) + # Send request. + self.xinfo.request(self) def request(self, req): sxp.show(req, out=self.transport) - self.transport.write(' \n') def loseConnection(self): self.transport.loseConnection() def connectionLost(self, reason): - self.minfo.closed(reason) - - def dispatch(self, val): - op = sxp.name(val) - op = op.replace('.', '_') - if op.startswith('xfr_'): - fn = getattr(self, op, self.unknown) - else: - fn = self.unknown() - fn(val) + self.xinfo.connectionLost(reason) def dataReceived(self, data): self.parser.input(data) if self.parser.ready(): val = self.parser.get_val() - self.dispatch(val) + self.xinfo.dispatch(val) if self.parser.at_eof(): self.loseConnection() - def unknown(self, val): - print 'unknown>', val - def xfr_progress(self, val): - print 'xfr_progress>', val - - def xfr_err(self, val): - # If we get an error with non-zero code the migrate failed. - # An error with code zero indicates hello success. - print 'xfr_err>', val - v = sxp.child(val) - print 'xfr_err>', type(v), v - err = int(sxp.child(val)) - if not err: return - self.minfo.error(err); - self.loseConnection() - - def xfr_ok(self, val): - # An ok indicates migrate completed successfully, and contains - # the new domain id on the remote system. - print 'xfr_ok>', val - dom = int(sxp.child(val)) - self.minfo.ok(dom) - self.loseConnection() - -class MigrateClientFactory(ClientFactory): +class XfrdClientFactory(ClientFactory): + """Factory for clients of the migration/save daemon xfrd. + """ def __init__(self, minfo): #ClientFactory.__init__(self) @@ -110,10 +72,63 @@ class MigrateClientFactory(ClientFactory): def clientConnectionFailed(self, connector, reason): print 'clientConnectionFailed>', 'connector=', connector, 'reason=', reason +class XfrdInfo: + """Abstract class for info about a session with xfrd. + Has subclasses for save and migrate. + """ + + def vmconfig(self): + print 'vmconfig>' + from xen.xend import XendDomain + xd = XendDomain.instance() + + dominfo = xd.domain_get(self.src_dom) + print 'vmconfig>', type(dominfo), dominfo + if dominfo: + val = sxp.to_string(dominfo.sxpr()) + else: + val = None + print 'vmconfig<', 'val=', type(val), val + return val -class XendMigrateInfo: + def error(self, err): + self.state = 'error' - # states: begin, active, failed, succeeded? + def dispatch(self, xfrd, val): + op = sxp.name(val) + op = op.replace('.', '_') + if op.startswith('xfr_'): + fn = getattr(self, op, self.unknown) + else: + fn = self.unknown() + fn(xfrd, val) + + def unknown(self, xfrd, val): + print 'unknown>', val + + def xfr_err(self, xfrd, val): + # If we get an error with non-zero code the migrate failed. + # An error with code zero indicates hello success. + print 'xfr_err>', val + v = sxp.child(val) + print 'xfr_err>', type(v), v + err = int(sxp.child(val)) + if not err: return + self.error(err); + xfrd.loseConnection() + + def xfr_progress(self, val): + print 'xfr_progress>', val + + def xfr_domain_pause(self, val): + print 'xfr__domain_pause>', val + + def xfr_domain_suspend(self, val): + print 'xfr_domain_suspend>', val + +class XendMigrateInfo(XfrdInfo): + """Representation of a migrate in-progress and its interaction with xfrd. + """ def __init__(self, id, dom, host, port): self.id = id @@ -126,12 +141,6 @@ class XendMigrateInfo: self.start = 0 self.deferred = defer.Deferred() - def set_state(self, state): - self.state = state - - def get_state(self): - return self.state - def sxpr(self): sxpr = ['migrate', ['id', self.id], ['state', self.state] ] sxpr_src = ['src', ['host', self.src_host], ['domain', self.src_dom] ] @@ -142,35 +151,72 @@ class XendMigrateInfo: sxpr.append(sxpr_dst) return sxpr - def vmconfig(self): - print 'vmconfig>' - from xen.xend import XendDomain - xd = XendDomain.instance() + def request(self, xfrd): + vmconfig = self.vmconfig() + if not vmconfig: + xfrd.loseConnection() + return + xfrd.request(['xfr.migrate', + self.src_dom, + vmconfig, + self.dst_host, + self.d.dst_port]) + + def xfr_migrate_ok(self, val): + dom = int(sxp.child0(val)) + self.state = 'ok' + self.dst_dom = dom - dominfo = xd.domain_get(self.src_dom) - print 'vmconfig>', type(dominfo), dominfo - if dominfo: - val = sxp.to_string(dominfo.sxpr()) + def connectionLost(self, reason=None): + if self.state =='ok': + eserver.inject('xend.migrate.ok', self.sxpr()) else: - val = None - print 'vmconfig<', 'val=', type(val), val - return val + self.state = 'error' + eserver.inject('xend.migrate.error', self.sxpr()) - def error(self, err): - self.state = 'error' +class XendSaveInfo(XfrdInfo): + """Representation of a save in-progress and its interaction with xfrd. + """ + + def __init__(self, id, dom, file): + self.id = id + self.state = 'begin' + self.src_dom = dom + self.file = file + self.start = 0 + self.deferred = defer.Deferred() + + def sxpr(self): + sxpr = ['save', + ['id', self.id], + ['state', self.state], + ['domain', self.src_dom], + ['file', self.file] ] + return sxpr - def ok(self, dom): + def request(self, xfrd): + vmconfig = self.vmconfig() + if not vmconfig: + xfrd.loseConnection() + return + xfrd.request(['xfr.save', self.src_dom, vmconfig, self.file ]) + + def xfr_save_ok(self, val): + dom = int(sxp.child0(val)) self.state = 'ok' - self.dst_dom = dom - def closed(self, reason=None): + def connectionLost(self, reason=None): if self.state =='ok': - eserver.inject('xend.migrate.ok', self.sxpr()) + eserver.inject('xend.save.ok', self.sxpr()) else: self.state = 'error' - eserver.inject('xend.migrate.error', self.sxpr()) + eserver.inject('xend.save.error', self.sxpr()) + class XendMigrate: + """External api for interaction with xfrd for migrate and save. + Singleton. + """ # Represents migration in progress. # Use log for indications of begin/end/errors? # Need logging of: domain create/halt, migrate begin/end/fail @@ -224,7 +270,7 @@ class XendMigrate: id = self.nextid() info = XendMigrateInfo(id, dom, host, port) self._add_migrate(id, info) - mcf = MigrateClientFactory(info) + mcf = XfrdClientFactory(info) reactor.connectTCP('localhost', XFRD_PORT, mcf) return info diff --git a/tools/python/xen/xend/packing.py b/tools/python/xen/xend/packing.py deleted file mode 100644 index 760af7f5a1..0000000000 --- a/tools/python/xen/xend/packing.py +++ /dev/null @@ -1,329 +0,0 @@ - -# XDR-style packer/unpacker for sxpr. -# -# string -> [STRING] [len:u16] -# atom -> [ATOM] [len:u16] -# int -> [UINT] [value] -# list -> [LIST] {1 elt}* 0 -# null -> [NULL] -# none -> [NONE] -# bool -> [BOOL] { 0:u8 | 1:u8 } -# -# types packed as u16. -# -# So (a b c) -> [LIST] 1 a 1 b 1 c 0 -# () -> [LIST] 0 - -import struct - -try: - from cStringIO import StringIO as _StringIO -except ImportError: - from StringIO import StringIO as _StringIO - -import types - -class Error(Exception): - - def __init__(self, msg): - self.msg = msg - - def __repr__(self): - return repr(self.msg) - - def __str__(self): - return str(self.msg) - - -class ConversionError(Error): - pass - -BOOL_SIZE = 1 -BOOL_FMT = '>B' - -BYTE_SIZE = 1 -BYTE_FMT = '>b' -UBYTE_FMT = '>B' - -SHORT_SIZE = 2 -SHORT_FMT = '>h' -USHORT_FMT = '>H' - -INT_SIZE = 4 -INT_FMT = '>l' -UINT_FMT = '>L' - -NONE_CODE = 0 -NULL_CODE = 1 -INT_CODE = 2 -STRING_CODE = 3 -ATOM_CODE = 4 -BOOL_CODE = 5 -LIST_CODE = 10 - -class Packer: - - def __init__(self, io=None): - self.reset(io=io) - - def reset(self, io=None): - if io is None: - io = _StringIO() - self.io = io - - def get_buffer(self): - return self.io.getvalue() - - def get_io(self): - return self.io - - def struct_pack(self, fmt, x): - try: - self.io.write(struct.pack(fmt, x)) - except struct.error, msg: - raise ConversionError, msg - - def pack_none(self): - pass - - def pack_bool(self, x): - # { '1' | '0' } - print 'bool>', x - if x: - self.io.write('\1') - else: - self.io.write('\0') - - def pack_byte(self, x): - self.struct_pack(BYTE_FMT, x & 0xff) - - def pack_char(self, x): - print 'char>', x - self.io.write(x) - - def pack_ubyte(self, x): - print 'ubyte>', x - self.struct_pack(UBYTE_FMT, x & 0xff) - - def pack_ushort(self, x): - print 'ushort>', x - self.struct_pack(USHORT_FMT, x & 0xffff) - - def pack_short(self, x): - print 'short>', x - self.struct_pack(SHORT_FMT, x & 0xffff) - - def pack_uint(self, x): - print 'uint>', x - self.struct_pack(UINT_FMT, x) - - def pack_int(self, x): - print 'int>', x - self.struct_pack(INT_FMT, x) - - def pack_uhyper(self, x): - print 'uhyper>', x - self.pack_uint(x>>32 & 0xffffffffL) - self.pack_uint(x & 0xffffffffL) - - pack_hyper = pack_uhyper - - def pack_fstring(self, n, x): - print 'fstring>', x - self.io.write(x) - - pack_fopaque = pack_fstring - - def pack_string(self, x): - print 'string>', x - n = len(x) - self.pack_ushort(n) - self.pack_fstring(n, x) - - pack_opaque = pack_string - pack_bytes = pack_string - - def pack_list(self, x, pack_item): - print 'list>', x - # { '1' }* '0' - for item in x: - self.pack_bool(1) - pack_item(item) - self.pack_bool(0) - - def pack_farray(self, x, pack_item): - # * - # Can pass n and check length - but is it worth it? - print 'farray>', list - for item in x: - pack_item(item) - - def pack_array(self, x, pack_item): - # n *n - print 'array>', x - self.pack_uint(len(x)) - self.pack_farray(x, pack_item) - -class Unpacker: - - def __init__(self, data): - self.reset(data) - - def reset(self, data): - if isinstance(data, types.StringType): - data = _StringIO(data) - self.io = data - - def get_bytes(self, n): - if n < 0: - raise ConversionError('negative byte count') - data = self.io.read(n) - return data - - def struct_unpack(self, fmt, n): - data = self.get_bytes(n) - try: - return struct.unpack(fmt, data)[0] - except struct.error, msg: - raise ConversionError, msg - - def unpack_none(self): - return None - - def unpack_bool(self): - return self.struct_unpack(BOOL_FMT, BOOL_SIZE) - - def unpack_char(self): - return self.get_bytes(1)[0] - - def unpack_byte(self): - return self.struct_unpack(BYTE_FMT, BYTE_SIZE) - - def unpack_ubyte(self): - return self.struct_unpack(UBYTE_FMT, BYTE_SIZE) - - def unpack_ushort(self): - return self.struct_unpack(USHORT_FMT, SHORT_SIZE) - - def unpack_short(self): - return self.struct_unpack(SHORT_FMT, SHORT_SIZE) - - def unpack_uint(self): - x = self.struct_unpack(UINT_FMT, UINT_SIZE) - try: - return int(x) - except OverflowError: - return x - - def unpack_int(self): - return self.struct_unpack(INT_FMT, INT_SIZE) - - def unpack_uhyper(self): - hi = self.unpack_uint() - lo = self.unpack_uint() - return long(hi)<<32 | lo - - def unpack_hyper(self): - x = self.unpack_uhyper() - if x >= 0x8000000000000000L: - x = x - 0x10000000000000000L - return x - - def unpack_fstring(self, n): - return self.get_bytes(n) - - unpack_fopaque = unpack_fstring - - def unpack_string(self): - n = self.unpack_ushort() - return self.unpack_fstring(n) - - unpack_opaque = unpack_string - unpack_bytes = unpack_string - - def unpack_list(self, unpack_item): - list = [] - while self.unpack_bool(): - list.append(unpack_item()) - return list - - def unpack_farray(self, n, unpack_item): - list = [] - for i in range(n): - list.append(unpack_item()) - return list - - def unpack_array(self, unpack_item): - n = self.unpack_ushort() - return self.unpack_farray(n, unpack_item) - -class SxpPacker(Packer): - - pack_code = Packer.pack_ushort - - def pack(self, x): - if isinstance(x, types.NoneType): - self.pack_code(NONE_CODE) - self.pack_none() - elif isinstance(x, types.IntType): - self.pack_code(INT_CODE) - self.pack_int(x) - elif isinstance(x, types.StringType): - self.pack_code(STRING_CODE) - self.pack_string(x) - elif isinstance(x, types.ListType): - self.pack_code(LIST_CODE) - self.pack_list(x, self.pack) - else: - raise Error('invalid type ' + str(type(x))) - -class SxpUnpacker(Unpacker): - - unpack_code = Unpacker.unpack_ushort - - def unpack(self): - code = self.unpack_code() - if code == NONE_CODE: - val = self.unpack_none() - elif code == INT_CODE: - val = self.unpack_int() - elif code == BOOL_CODE: - val = self.unpack_bool() - elif code == STRING_CODE: - val = self.unpack_string() - elif code == ATOM_CODE: - val = self.unpack_string() - elif code == LIST_CODE: - val = self.unpack_list(self.unpack) - else: - raise Error('invalid code ' + str(code)) - return val - -def main(): - d = "['vfarm', ['@', ['name', 'vfarm1']], ['memory', 1024], ['image', 'splinux'], ['args', 'root=/dev/nfs ip=dhcp'], [ 1, -1, 1000000]]" - print"> len=", len(d), "d=", d - obj = ['vfarm', ['@', ['name', 'vfarm1']], - ['memory', 1024], - ['image', 'splinux'], - ['args', 'root=/dev/nfs ip=dhcp'], - [ 1, -1, 1000000] ] - print "> obj=", obj - pack = SxpPacker() - pack.pack(obj) - data = pack.get_buffer() - print "> len=", len(data), "data=", data - unpack = SxpUnpacker(data) - obj_unpack = unpack.unpack() - print "> obj=", obj_unpack - #obj = [100,101,102, 999.00234, { 'a': 1, 'b': 2 } ] - #pack.reset() - #pack.pack_item(obj) - #data = pack.get_buffer() - #print "> obj=", obj - #print "> len=", len(data), "data=", data - #unpack.reset(data) - #obj_unpack = unpack.unpack_item() - #print "> obj=", obj_unpack - -if __name__ == "__main__": - main() diff --git a/tools/xfrd/xfrdClient.py b/tools/xfrd/xfrdClient.py index 1f1fc22407..4badf454db 100755 --- a/tools/xfrd/xfrdClient.py +++ b/tools/xfrd/xfrdClient.py @@ -14,7 +14,6 @@ import StringIO sys.path.append("/home/mjw/repos-bk/xeno-unstable.bk/tools/python") import xen.xend.sxp as sxp -from xen.xend.packing import SxpPacker, SxpUnpacker XFRD_PORT = 8002 -- 2.30.2